package net.gdface.facelog;

import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import gu.simplemq.MQLocationType;
import gu.simplemq.MessageQueueFactorys;
import gu.simplemq.MessageQueueType;
import gu.simplemq.utils.URISupport;

import static com.google.common.base.Preconditions.*;
import static gu.dtalk.activemq.ActivemqContext.HELPER;
import static com.google.common.base.MoreObjects.firstNonNull;

/**
 * 本地(嵌入) ACTIVEMQ 服务控制器<br>
 * @author guyadong
 *
 */
class ActivemqControllerEmbedded extends ActivemqController  {
	public static final String DEFAULT_AMQ_SCHEMA = "tcp";
	public static final String DEFAULT_WS_CONNECTOR = "ws://localhost:61614";
	@SuppressWarnings({ "rawtypes" })
	protected ActivemqControllerEmbedded(Map connParameters) {
		super(connParameters);
		String pubsub = getPubsubLocation();
		String ws = firstNonNull(HELPER.with(MQLocationType.WS).getLocationAsString(originConnParameters), DEFAULT_WS_CONNECTOR);
		initializedProperties.put(MQ_URI, location.toString());
		initializedProperties.put(MQ_PUBSUB_URI, pubsub);
		initializedProperties.put(MQ_PUBSUB_MQTT, Boolean.TRUE.toString());
		initializedProperties.put(MQ_WS_URI, ws);
		if(!Strings.isNullOrEmpty(getUsername()) && !Strings.isNullOrEmpty(getPassword())){
			initializedProperties.put(MQ_USERNAME, getUsername());
			initializedProperties.put(MQ_PASSWORD, getPassword());
		}
	}
	/**
	 * 从连接参数中返回订阅发布位置URI字符串,如果没有定义则使用默认值
	 * @return 订阅发布位置URI字符串
	 */
	private String getPubsubLocation(){
		String pubsub = HELPER.with(MQLocationType.PUBSUB).getLocationAsString(originConnParameters);
		if(pubsub == null || !pubsub.startsWith(MQTT_SCHEMA)){
			pubsub = DEFAULT_MQTT_CONNECTOR;
		}
		return pubsub;
	}
	@Override
	protected Object startEmbeddedServer() {
		try {
			checkState(originConnParameters != null,"originConnParameters is uninitialized");
			checkState(initializedProperties != null,"initializedProperties is uninitialized");

			ServiceConstant.logger.info("Start embedded activemq server(启动嵌入式activemq服务)");
			BrokerService broker = new BrokerService();
			broker.setUseJmx(true);
			broker.setPersistent(false);
			HashSet<URI> connectors = Sets.newHashSet(
					HELPER.getLocation(initializedProperties),
					URI.create(getPubsubLocation())
					,firstNonNull(HELPER.with(MQLocationType.WS).getLocation(originConnParameters), URI.create(DEFAULT_WS_CONNECTOR))
					);
			
			for(URI uri:connectors){
				uri = URISupport.changeHost(uri, "0.0.0.0");
				ServiceConstant.logger.info("WITH TRANSPORT {}",uri);
				String name = uri.getScheme().equals(DEFAULT_AMQ_SCHEMA) ? PROTOCOL_OPENWIRE : uri.getScheme();
				broker.addConnector(uri).setName(name);
			}
			List<BrokerPlugin> plugins = Lists.newArrayList();
			if(initializedProperties.containsKey(MQ_USERNAME) && initializedProperties.containsKey(MQ_PASSWORD)){
				// 定义登录用户名和密码
				AuthenticationUser user = new AuthenticationUser(
						(String)initializedProperties.get(MQ_USERNAME),
						(String)initializedProperties.get(MQ_PASSWORD),
						"users");
				plugins.add(new SimpleAuthenticationPlugin(Arrays.asList(user)));
				ServiceConstant.logger.info("AUTHORIZER INFORMATION  {}[{}]  {}[{}]",
						MQ_USERNAME,
						initializedProperties.get(MQ_USERNAME),
						MQ_PASSWORD,
						initializedProperties.get(MQ_PASSWORD));
			}else if(!initializedProperties.containsKey(MQ_USERNAME) && !initializedProperties.containsKey(MQ_PASSWORD)){
				ServiceConstant.logger.info("NOT DEFINED {}/{} FOR AUTHORIZATION",MQ_USERNAME,MQ_PASSWORD);
			}else{
				ServiceConstant.logger.warn("INVALID authorizer information  {}[{}]  {}[{}]",
						MQ_USERNAME,
						initializedProperties.get(MQ_USERNAME),
						MQ_PASSWORD,
						initializedProperties.get(MQ_PASSWORD));
			}
			
			if(!plugins.isEmpty()){
				broker.setPlugins(plugins.toArray(new BrokerPlugin[0]));
			}
			ManagementContext context = new ManagementContext();
			context.setCreateConnector(false);
			broker.setManagementContext(context);
			broker.start();
			return broker;
		} catch (Exception e) {
			Throwables.throwIfUnchecked(e);
			throw new RuntimeException(e);
		}
	}
	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Override
	boolean init() {
		super.init();
		this.factory = MessageQueueFactorys.getFactory(MessageQueueType.ACTIVEMQ).init((Map)initializedProperties);
		return true;
	}
}
